Assignment 03

Author
Affiliation

Devin Blanchard

Boston University

Published

September 24, 2025

Modified

October 17, 2025

1 Data Loading and Inspection

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

np.random.seed(42)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/17 21:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]                                                                                25/10/17 21:33:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

2 Data Cleaning

from pyspark.sql.functions import col

df = df.withColumn("SALARY", col("SALARY").cast("float"))
df = df.withColumn("SALARY_FROM", col("SALARY_FROM").cast("float"))
df = df.withColumn("SALARY_TO", col("SALARY_TO").cast("float"))
df = df.withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float"))
df = df.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))

# Compute median salary
median_from = df.approxQuantile("SALARY_FROM", [0.5], 0.01)[0]
median_to = df.approxQuantile("SALARY_TO", [0.5], 0.01)[0]
median_salary = df.approxQuantile("SALARY", [0.5], 0.01)[0]

print("Medians:",median_from, median_to, median_salary)

# Impute missing 'SALARY_FROM' and 'SALARY_TO' with their medians
df = df.fillna({
    "SALARY_FROM": median_from,
    "SALARY_TO": median_to,
    "SALARY": median_salary
})

# Compute 'AVERAGE_SALARY'
df = df.withColumn(
    "AVERAGE_SALARY", (col("SALARY_FROM") + col("SALARY_TO")) / 2
)

# Impute missing 'SALARY' with AVERAGE_SALARY, and if that's missing, with the median salary
from pyspark.sql.functions import when

df = df.withColumn(
    "SALARY",
    when(
        col("SALARY").isNull(),
        when(col("AVERAGE_SALARY").isNotNull(), col("AVERAGE_SALARY"))
        .otherwise(median_salary)
    ).otherwise(col("SALARY"))
)

from pyspark.sql.functions import regexp_replace

df = df.withColumn(
    "EDUCATION_LEVELS_NAME",
    regexp_replace(col("EDUCATION_LEVELS_NAME"), r'[\n\r]', '')
)

# Overwrite
df.write.option("header", True).mode("overwrite").csv("data/lightcast_job_postings_cleaned.csv")

# Display row count
print(f"Rows retained after cleaning: {df.count()}")
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                [Stage 6:>                                                          (0 + 1) / 1]
Rows retained after cleaning: 72498
                                                                                

3 Salary Distribution by Industry and Employment Type

# Filter for non-missing, nonzero salaries (use SALARY, not SALARY_FROM)

plot_df = df.select("NAICS2_NAME", "SALARY").filter(col("SALARY") > 0).toPandas()

fig = px.box(
    plot_df,
    x="NAICS2_NAME",
    y="SALARY",
    points="all",
    title="Salary Distribution by Industry",
)

fig.update_traces(
    marker=dict(color='rgb(52,152,219)', opacity=0.5, size=4),
    line=dict(color='rgb(41,128,185)'),
    fillcolor='rgba(41,128,185,0.3)',
    jitter=0,
    pointpos=0
)

fig.update_layout(
    xaxis_title="Industry",
    yaxis_title="SALARY",
    font=dict(size=14, family="Arial"),
    plot_bgcolor="#F4F8FF",
    paper_bgcolor="#F4F8FF",
    xaxis_tickangle=-90,
    height=1200,
    width=700,
    showlegend=False,
    boxmode='overlay'
)

fig.show()
[Stage 9:>                                                          (0 + 1) / 1]                                                                                

Salaries vary widely between industries, with sectors like Information and Finance & Insurance generally showing higher salary ranges than industries such as Accommodation and Food Services. Full-time positions tend to have higher median salaries across most industries compared to part-time or other employment types.

4 Salary Analysis by ONET Occupation Type (Bubble Chart)

# Lot Occupation Name
salary_analysis = spark.sql("""
    SELECT
        LOT_OCCUPATION_NAME AS OCCUPATION_NAME,
        PERCENTILE(SALARY, 0.5) AS Median_Salary,
        COUNT(*) AS Job_Postings
    FROM job_postings
    GROUP BY LOT_OCCUPATION_NAME
    ORDER BY Job_Postings DESC
    LIMIT 10
""")

salary_pd = salary_analysis.toPandas()
salary_pd.head()

import plotly.express as px

fig = px.scatter(
    salary_pd,
    x="OCCUPATION_NAME",
    y="Median_Salary",
    size="Job_Postings",
    title="Salary Analysis by LOT Occupation Type (Bubble Chart)",
    labels={
        "OCCUPATION_NAME": "LOT Occupation",
        "Median_Salary": "Median Salary",
        "Job_Postings": "Number of Job Postings"
    },
    hover_name="OCCUPATION_NAME",
    size_max=60,
    width=1000,
    height=600,
    color="Job_Postings",
    color_continuous_scale="Viridis",
)

# Layout Customization
fig.update_layout(
    font_family="Arial",
    font_size=14,
    title_font_size=25,
    xaxis_title="LOT Occupation",
    yaxis_title="Median Salary",
    plot_bgcolor="#f6f9fa",
    xaxis=dict(
        tickangle=-60,
        showline=True,
        linecolor="#444"
    ),
    yaxis=dict(
        showline=True,
        linecolor="#444"
    ),
    xaxis_title_font=dict(size=17),
    yaxis_title_font=dict(size=17),
)

fig.show()
[Stage 10:>                                                         (0 + 1) / 1]                                                                                

5 Salary by Education Level

# Education levels
lower_deg = ["Bachelor", "Associate", "GED", "No Education Listed", "High school"]
higher_deg = ["Master", "PhD", "Doctorate", "professional degree"]

# Add EDU_GROUP column
df = df.withColumn(
    "EDU_GROUP",
    when(
        col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}" for deg in lower_deg])),
        "Bachelor's or lower"
    ).when(
        col("EDUCATION_LEVELS_NAME").rlike("|".join([f"(?i){deg}" for deg in higher_deg])),
        "Master's or PhD"
    ).otherwise("Other")
)

# Cast columns
df = df.withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))
df = df.withColumn("Average_Salary", col("Average_Salary").cast("float"))

# Filter for non-null and positive values
df = df.filter(
    (col("MAX_YEARS_EXPERIENCE").isNotNull()) &
    (col("Average_Salary").isNotNull()) &
    (col("MAX_YEARS_EXPERIENCE") > 0) &
    (col("Average_Salary") > 0)
)

# Filter for education groups
df_filtered = df.filter(
    col("EDU_GROUP").isin("Bachelor's or lower", "Master's or PhD"))

# Convert to Pandas
df_pd = df_filtered.toPandas()
df_pd.head()

# Scatter plot: Experience vs. Salary by Education Group

import plotly.express as px

# Plot
fig1 = px.scatter(
    df_pd,
    x="MAX_YEARS_EXPERIENCE",
    y="Average_Salary",
    color="EDU_GROUP",
    hover_data=["LOT_V6_SPECIALIZED_OCCUPATION_NAME"],
    title="<b>Experience vs Salary by Education Level</b>",
    opacity=0.7,
    color_discrete_sequence=["#36B37E", "#A259EC"]  # Custom green & purple
)

# Add borders
fig1.update_traces(marker=dict(size=7, line=dict(width=1, color="black")))

# Update layout
fig1.update_layout(
    plot_bgcolor="#f9f9f9",
    paper_bgcolor="#EAF7FF",  # Softer blue background
    font=dict(family="Segoe UI", size=14),
    title_font=dict(size=22),
    xaxis_title="Years of Experience",
    yaxis_title="Average Salary (USD)",
    legend_title="Education Group",
    hoverlabel=dict(bgcolor="white", font_size=13, font_family="Arial"),
    margin=dict(t=70, b=60, l=60, r=60),
    xaxis=dict(
        gridcolor="lightgrey",
        tickmode="linear",
        dtick=1,
        zeroline=False
    ),
    yaxis=dict(
        gridcolor="lightgrey",
        zeroline=False
    )
)

fig1.show()
[Stage 13:>                                                         (0 + 1) / 1]